Skip to content

Improvement: keep order-preserving repartitions for streaming aggregates#21107

Open
xudong963 wants to merge 3 commits intoapache:mainfrom
xudong963:agg_order
Open

Improvement: keep order-preserving repartitions for streaming aggregates#21107
xudong963 wants to merge 3 commits intoapache:mainfrom
xudong963:agg_order

Conversation

@xudong963
Copy link
Member

@xudong963 xudong963 commented Mar 23, 2026

Which issue does this PR close?

  • Closes #.

Rationale for this change

What changes are included in this PR?

This PR updates EnforceDistribution to keep order-preserving repartition variants when preserving input ordering allows the parent operator to remain incremental/streaming.

Previously, order-preserving variants could be removed when prefer_existing_sort = false or when there was no explicit ordering requirement, even if dropping the ordering would force a parent operator such as AggregateExec to fall back to blocking execution. This change adds a targeted preserving_order_enables_streaming check and uses it to avoid replacing RepartitionExec(..., preserve_order=true) / SortPreservingMergeExec when that preserved ordering is what enables streaming behavior.

As a result, the optimizer now prefers keeping order-preserving repartitioning in these cases, and the updated sqllogictests reflect the new physical plans: instead of inserting a SortExec above a plain repartition, plans now retain RepartitionExec(... preserve_order=true) so sorted or partially sorted aggregates can continue running incrementally.

Are these changes tested?

Are there any user-facing changes?

No extra sort needed for these cases

@xudong963 xudong963 marked this pull request as draft March 23, 2026 05:36
@github-actions github-actions bot added optimizer Optimizer rules sqllogictest SQL Logic Tests (.slt) labels Mar 23, 2026
@xudong963 xudong963 changed the title Fix/aggregate output ordering streaming (#33) Fix/aggregate output ordering streaming Mar 23, 2026
@github-actions github-actions bot added the core Core DataFusion crate label Mar 23, 2026
@xudong963 xudong963 added the performance Make DataFusion faster label Mar 23, 2026
@xudong963 xudong963 marked this pull request as ready for review March 23, 2026 09:33
@xudong963 xudong963 requested a review from alamb March 23, 2026 09:33
@alamb
Copy link
Contributor

alamb commented Mar 24, 2026

Previously, order-preserving variants could be removed when prefer_existing_sort = false or when there was no explicit ordering requirement, even if dropping the ordering would force a parent operator such as AggregateExec to fall back to blocking execution.

I am not sure this is a "bug" necessarily -- more like a tradeoff. I believe the enforce_distribution plan will attempt to increase plan parallelism even if it has to resort by default

My understanding is that this is what the prefer_existing_sort setting controls

prefer_existing_sort = false

So if you want plans to keep existing sorts and not increase parallelism in that case, you should set prefer_existing_sort = true -- this is what we do in InfluxDB FWIW

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xudong963 -- this change looks good to me. As I understand it it avoids adding a HashRepartitioning (and uses a more memory efficient operator) so that sounds like a win all around

I was worried that this change would result in trading off "more sortedness" for "less parallelism" but from what I can see that is not the case.

One thing we may want to consider it gating this behavior behind the "prefer existing sort" flag -- that way

I don't think we should be ignoring the errors, but otherwise the code and tests look good to me.

10)------------------BoundedWindowAggExec: wdw=[row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Field { "row_number() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING": UInt64 }, frame: ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING], mode=[Sorted]
11)--------------------LazyMemoryExec: partitions=1, batch_generators=[range: start=1, end=5, batch_size=8192]
03)----RepartitionExec: partitioning=Hash([generated_id@0], 4), input_partitions=4, preserve_order=true, sort_exprs=generated_id@0 ASC NULLS LAST
04)------AggregateExec: mode=Partial, gby=[generated_id@0 as generated_id], aggr=[array_agg(unnested.ar)], ordering_mode=Sorted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this plan looks better to me (it still is fully parallelized and now uses avoids an unecessary hash repartition

// Parent is blocking even with ordering — no benefit
return false;
}
// Build parent with an unordered child (simulating CoalescePartitionsExec)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this commet is strange to me -- the code adds a CoalescePartitionsExec -- so I don't think it is "simulating" anything

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's legecy

let with_ordered =
match Arc::clone(parent).with_new_children(vec![Arc::clone(ordered_child)]) {
Ok(p) => p,
Err(_) => return false,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should ignore the Err here or below as it could mask real errors / a bug with this code

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree!

xudong963 and others added 3 commits March 25, 2026 17:37
Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
@xudong963
Copy link
Member Author

@alamb thanks for the review, this PR is not trading parallelism for sortedness, and it does not remove the hash repartition.

The plan still uses the same Hash repartition with the same partition count; the only change is that we keep the order-preserving variant when removing it would cause the parent AggregateExec (in Sorted / PartiallySorted mode) to switch from incremental to blocking execution.

@xudong963 xudong963 changed the title Fix/aggregate output ordering streaming Improvement: keep order-preserving repartitions for streaming aggregates Mar 25, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate optimizer Optimizer rules performance Make DataFusion faster sqllogictest SQL Logic Tests (.slt)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants